{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Tce3stUlHN0L"
      },
      "source": [
        "##### Copyright 2018 The TensorFlow Authors.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "tuOe1ymfHZPu"
      },
      "outputs": [],
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MfBg1C5NB3X0"
      },
      "source": [
        "# 使用 TensorFlow 进行分布式训练"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "r6P32iYYV27b"
      },
      "source": [
        "<table class=\"tfo-notebook-buttons\" align=\"left\">\n",
        "  <td><a target=\"_blank\" href=\"https://tensorflow.google.cn/guide/distributed_training\" class=\"\"><img src=\"https://tensorflow.google.cn/images/tf_logo_32px.png\" class=\"\">在 TensorFlow.org 上查看</a></td>\n",
        "  <td><a target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/docs-l10n/blob/master/site/zh-cn/guide/distributed_training.ipynb\" class=\"\"><img src=\"https://tensorflow.google.cn/images/colab_logo_32px.png\">在 Google Colab 中运行 </a></td>\n",
        "  <td><a target=\"_blank\" href=\"https://github.com/tensorflow/docs-l10n/blob/master/site/zh-cn/guide/distributed_training.ipynb\" class=\"\"><img src=\"https://tensorflow.google.cn/images/GitHub-Mark-32px.png\">在 GitHub 上查看源代码</a></td>\n",
        "  <td><a href=\"https://storage.googleapis.com/tensorflow_docs/docs-l10n/site/zh-cn/guide/distributed_training.ipynb\" class=\"\"><img src=\"https://tensorflow.google.cn/images/download_logo_32px.png\">下载笔记本</a></td>\n",
        "</table>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "xHxb-dlhMIzW"
      },
      "source": [
        "## 概述\n",
        "\n",
        "`tf.distribute.Strategy` 是一个可在多个 GPU、多台机器或 TPU 上进行分布式训练的 TensorFlow API。使用此 API，您只需改动较少代码就能分布现有模型和训练代码。\n",
        "\n",
        "`tf.distribute.Strategy` 旨在实现以下目标：\n",
        "\n",
        "- 易于使用，支持多种用户（包括研究人员和 ML 工程师等）。\n",
        "- 提供开箱即用的良好性能。\n",
        "- 轻松切换策略。\n",
        "\n",
        "`tf.distribute.Strategy` 可用于 [Keras](https://tensorflow.google.cn/guide/keras) 等高级 API，也可用来分布自定义训练循环（以及，一般来说，使用 TensorFlow 的任何计算）。\n",
        "\n",
        "在 TensorFlow 2.x 中，您可以立即执行程序，也可以使用 [`tf.function`](function.ipynb) 在计算图中执行。虽然 `tf.distribute.Strategy` 对两种执行模式都支持，但使用 `tf.function` 效果最佳。建议仅将 Eager 模式用于调试，而 `TPUStrategy` 不支持此模式。尽管本指南大部分时间在讨论训练，但该 API 也可用于在不同平台上分布评估和预测。\n",
        "\n",
        "您在使用 `tf.distribute.Strategy` 时只需改动少量代码，因为我们修改了 TensorFlow 的底层组件，使其可感知策略。这些组件包括变量、层、优化器、指标、摘要和检查点。\n",
        "\n",
        "在本指南中，我们将介绍各种类型的策略，以及如何在不同情况下使用它们。\n",
        "\n",
        "注：为了更深入地理解这些概念，请观看[此深入演示](https://youtu.be/jKV53r9-H14)。如果您计划编写自己的训练循环，尤其建议您观看此视频。\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "EVOZFbNgXghB"
      },
      "outputs": [],
      "source": [
        "# Import TensorFlow\n",
        "import tensorflow as tf"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "eQ1QESxxEbCh"
      },
      "source": [
        "## 策略类型\n",
        "\n",
        "`tf.distribute.Strategy` 打算涵盖不同轴上的许多用例。目前已支持其中的部分组合，将来还会添加其他组合。其中一些轴包括：\n",
        "\n",
        "- *同步和异步训练*：这是通过数据并行进行分布式训练的两种常用方法。在同步训练中，所有工作进程都同步地对输入数据的不同片段进行训练，并且会在每一步中聚合梯度。在异步训练中，所有工作进程都独立训练输入数据并异步更新变量。通常情况下，同步训练通过全归约实现，而异步训练通过参数服务器架构实现。\n",
        "- *硬件平台*：您可能需要将训练扩展到一台机器上的多个 GPU 或一个网络中的多台机器（每台机器拥有 0 个或多个 GPU），或扩展到 Cloud TPU 上。\n",
        "\n",
        "要支持这些用例，有六种策略可选。在下一部分，我们将说明当前在 TF 2.2 的哪些场景中支持哪些策略。以下为快速概览：\n",
        "\n",
        "训练 API | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | CentralStorageStrategy | ParameterServerStrategy\n",
        ":-- | :-- | :-- | :-- | :-- | :--\n",
        "**Keras API** | 支持 | 支持 | 实验性支持 | 实验性支持 | 计划于 2.3 后支持\n",
        "**自定义训练循环** | 支持 | 支持 | 实验性支持 | 实验性支持 | 计划于 2.3 后支持\n",
        "**Estimator API** | 有限支持 | 不支持 | 有限支持 | 有限支持 | 有限支持\n",
        "\n",
        "注：[实验性支持](https://tensorflow.google.cn/guide/versions#what_is_not_covered)指不保证该 API 的兼容性。\n",
        "\n",
        "注：对 Estimator 提供有限支持。基本训练和评估都是实验性的，而未实现高级功能（如基架）。如未涵盖某一用例，建议您使用 Keras 或自定义训练循环。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "DoQKKK8dtfg6"
      },
      "source": [
        "### MirroredStrategy\n",
        "\n",
        "`tf.distribute.MirroredStrategy` 支持在一台机器的多个 GPU 上进行同步分布式训练。该策略会为每个 GPU 设备创建一个副本。模型中的每个变量都会在所有副本之间进行镜像。这些变量将共同形成一个名为 `MirroredVariable` 的单个概念变量。这些变量会通过应用相同的更新彼此保持同步。\n",
        "\n",
        "高效的全归约算法用于在设备之间传递变量更新。全归约算法通过加总各个设备上的张量使其聚合，并使其在每个设备上可用。这是一种非常高效的融合算法，可以显著减少同步开销。根据设备之间可用的通信类型，可以使用的全归约算法和实现方法有很多。默认使用 NVIDIA NCCL 作为全归约实现。您可以选择我们提供的其他选项，也可以自己编写。\n",
        "\n",
        "以下是创建 `MirroredStrategy` 最简单的方法：\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "9Z4FMAY9ADxK"
      },
      "outputs": [],
      "source": [
        "mirrored_strategy = tf.distribute.MirroredStrategy()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "wldY4aFCAH4r"
      },
      "source": [
        "这会创建一个 `MirroredStrategy` 实例，该实例使用所有对 TensorFlow 可见的 GPU，并使用 NCCL 进行跨设备通信。\n",
        "\n",
        "如果您只想使用机器上的部分 GPU，您可以这样做："
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "nbGleskCACv_"
      },
      "outputs": [],
      "source": [
        "mirrored_strategy = tf.distribute.MirroredStrategy(devices=[\"/gpu:0\", \"/gpu:1\"])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8-KDnrJLAhav"
      },
      "source": [
        "如果您想重写跨设备通信，可以通过提供 `tf.distribute.CrossDeviceOps` 的实例，使用 `cross_device_ops` 参数来实现。目前，除了默认选项 `tf.distribute.NcclAllReduce` 外，还有 `tf.distribute.HierarchicalCopyAllReduce` 和 `tf.distribute.ReductionToOneDevice` 两个选项。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "6-xIOIpgBItn"
      },
      "outputs": [],
      "source": [
        "mirrored_strategy = tf.distribute.MirroredStrategy(\n",
        "    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "kPEBCMzsGaO5"
      },
      "source": [
        "### TPUStrategy\n",
        "\n",
        "您可以使用 `tf.distribute.experimental.TPUStrategy` 在张量处理单元 (TPU) 上运行 TensorFlow 训练。TPU 是 Google 的专用 ASIC，旨在显著加速机器学习工作负载。您可通过 Google Colab、[TensorFlow Research Cloud](https://tensorflow.google.cn/tfrc) 和 [Cloud TPU](https://cloud.google.com/tpu) 平台进行使用。\n",
        "\n",
        "就分布式训练架构而言，`TPUStrategy` 和 `MirroredStrategy` 是一样的，即实现同步分布式训练。TPU 会在多个 TPU 核心之间实现高效的全归约和其他集合运算，并将其用于 `TPUStrategy`。\n",
        "\n",
        "下面演示了如何将 `TPUStrategy` 实例化：\n",
        "\n",
        "注：要在 Colab 中运行此代码，应将 TPU 作为 Colab 运行时。具体请参阅 [TensorFlow TPU 指南](https://tensorflow.google.cn/guide/tpu)。\n",
        "\n",
        "```\n",
        "cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(     tpu=tpu_address) tf.config.experimental_connect_to_cluster(cluster_resolver) tf.tpu.experimental.initialize_tpu_system(cluster_resolver) tpu_strategy = tf.distribute.experimental.TPUStrategy(cluster_resolver)\n",
        "```\n",
        "\n",
        "`TPUClusterResolver` 实例可帮助定位 TPU。在 Colab 中，您无需为其指定任何参数。\n",
        "\n",
        "如果要将其用于 Cloud TPU，您必须：\n",
        "\n",
        "- 在 `tpu` 参数中指定 TPU 资源的名称。\n",
        "- 在程序*开始*时显式地初始化 TPU 系统。这是使用 TPU 进行计算前的必须步骤。初始化 TPU 系统还会清除 TPU 内存，所以为了避免丢失状态，请务必先完成此步骤。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8Xc3gyo0Bejd"
      },
      "source": [
        "### MultiWorkerMirroredStrategy\n",
        "\n",
        "`tf.distribute.experimental.MultiWorkerMirroredStrategy` 与 `MirroredStrategy` 非常相似。它实现了跨多个工作进程的同步分布式训练，而每个工作进程可能有多个 GPU。与 `MirroredStrategy` 类似，它也会跨所有工作进程在每个设备的模型中创建所有变量的副本。\n",
        "\n",
        "它使用 [CollectiveOps](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/ops/collective_ops.py) 作为多工作进程全归约通信方法，用于保持变量同步。集合运算是 TensorFlow 计算图中的单个运算，它可以根据硬件、网络拓扑和张量大小在 TensorFlow 运行期间自动选择全归约算法。\n",
        "\n",
        "它还实现了其他性能优化。例如，静态优化，可以将小张量上的多个全归约转化为大张量上较少的全归约。此外，我们还在为它设计插件架构，这样您将来就能以插件的形式使用针对您的硬件进行了更好优化的算法。请注意，集合运算还可以实现其他集合运算，比如广播和全收集。\n",
        "\n",
        "以下是创建 `MultiWorkerMirroredStrategy` 最简单的方法："
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "m3a_6ebbEjre"
      },
      "outputs": [],
      "source": [
        "multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "bt94JBvhEr4s"
      },
      "source": [
        "`MultiWorkerMirroredStrategy` 目前为您提供两种不同的集合运算实现方法。`CollectiveCommunication.RING` 通过将 gRPC 用作通信层来实现基于环的集合。`CollectiveCommunication.NCCL` 使用 [NVIDIA 的 NCCL](https://developer.nvidia.com/nccl) 来实现集合。`CollectiveCommunication.AUTO` 会将选择推迟到运行时。集合实现的最佳选择取决于 GPU 的数量和种类，以及集群中的网络互连。您可以通过以下方式来指定：\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "QGX_QAEtFQSv"
      },
      "outputs": [],
      "source": [
        "multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(\n",
        "    tf.distribute.experimental.CollectiveCommunication.NCCL)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "0JiImlw3F77E"
      },
      "source": [
        "与多 GPU 训练相比，多工作进程训练的一个主要差异是多工作进程的设置。`TF_CONFIG` 环境变量是在 TensorFlow 中为作为集群一部分的每个工作进程指定集群配置的标准方法。详细了解如何[设置 TF_CONFIG](#TF_CONFIG)。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "E20tG21LFfv1"
      },
      "source": [
        "注：此策略处于 [experimental](https://tensorflow.google.cn/guide/versions#what_is_not_covered) 阶段，我们目前正在进行改进，使其能够用于更多场景。敬请期待 API 的未来变化。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "45H0Wa8WKI8z"
      },
      "source": [
        "### CentralStorageStrategy\n",
        "\n",
        "`tf.distribute.experimental.CentralStorageStrategy` 也执行同步训练。变量不会被镜像，而是放在 CPU 上，且运算会复制到所有本地 GPU 。如果只有一个 GPU，则所有变量和运算都将被放在该 GPU 上。\n",
        "\n",
        "请通过以下代码，创建 `CentralStorageStrategy` 实例：\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "rtjZOyaoMWrP"
      },
      "outputs": [],
      "source": [
        "central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "KY1nJHNkMl7b"
      },
      "source": [
        "这会创建一个 `CentralStorageStrategy` 实例，该实例将使用所有可见的 GPU 和 CPU。在副本上对变量的更新将先进行聚合，然后再应用于变量。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "aAFycYUiNCUb"
      },
      "source": [
        "注：此策略处于 [experimental](https://tensorflow.google.cn/guide/versions#what_is_not_covered) 阶段，我们目前正在进行改进，使其能够用于更多场景。敬请期待 API 的未来变化。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "3ZLBhaP9NUNr"
      },
      "source": [
        "### ParameterServerStrategy\n",
        "\n",
        "`tf.distribute.experimental.ParameterServerStrategy` 支持在多台机器上进行参数服务器训练。在此设置中，有些机器会被指定为工作进程，有些会被指定为参数服务器。模型的每个变量都会被放在参数服务器上。计算会被复制到所有工作进程的所有 GPU 中。\n",
        "\n",
        "就代码而言，该策略看起来与其他策略类似：\n",
        "\n",
        "```\n",
        "ps_strategy = tf.distribute.experimental.ParameterServerStrategy()\n",
        "```"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "zr1wPHYvOH0N"
      },
      "source": [
        "对于多工作进程训练，`TF_CONFIG` 需要在集群中指定参数服务器和工作进程的配置，有关详细信息，可以阅读[下面的 TF_CONFIG](#TF_CONFIG)。\n",
        "\n",
        "注：该策略仅适用于 Estimator API。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "t2XUdmIxKljq"
      },
      "source": [
        "### 其他策略\n",
        "\n",
        "除上述策略外，还有其他两种策略可能对使用 `tf.distribute` API 进行原型设计和调试有所帮助。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "UD5I1beTpc7a"
      },
      "source": [
        "#### 默认策略\n",
        "\n",
        "默认策略是一种分布式策略，当作用域内没有显式分布策略时就会出现。此策略会实现 `tf.distribute.Strategy` 接口，但只具有传递功能，不提供实际分布。例如，`strategy.run(fn)` 只会调用 `fn`。使用该策略编写的代码与未使用任何策略编写的代码完全一样。您可以将其视为“无运算”策略。\n",
        "\n",
        "默认策略是一种单一实例，无法创建它的更多实例。可通过在任意显式策略的作用域（与可用于在显式策略的作用域内获得当前策略的 API 相同）外使用 `tf.distribute.get_strategy()` 获得该策略。 "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ibHleFOOmPn9"
      },
      "outputs": [],
      "source": [
        "default_strategy = tf.distribute.get_strategy()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "EkxPl_5ImLzc"
      },
      "source": [
        "该策略有两个主要用途：\n",
        "\n",
        "- 它允许无条件编写可感知分布的库代码。例如，在优化器中，我们可以执行 `tf.distribute.get_strategy()` 并使用该策略来减少梯度，而它将始终返回一个我们可以在其上调用归约 API 的策略对象。\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "WECeRzUdT6bU"
      },
      "outputs": [],
      "source": [
        "# In optimizer or other library code\n",
        "# Get currently active strategy\n",
        "strategy = tf.distribute.get_strategy()\n",
        "strategy.reduce(\"SUM\", 1.)  # reduce some values"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "JURbH-pUT51B"
      },
      "source": [
        "- 与库代码类似，它可以用来在使用或不使用分布策略的情况下编写最终用户的程序，而无需条件逻辑。以下示例代码段展示了这一点："
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "O4Vmae5jmSE6"
      },
      "outputs": [],
      "source": [
        "if tf.config.list_physical_devices('gpu'):\n",
        "  strategy = tf.distribute.MirroredStrategy()\n",
        "else:  # use default strategy\n",
        "  strategy = tf.distribute.get_strategy() \n",
        "\n",
        "with strategy.scope():\n",
        "  # do something interesting\n",
        "  print(tf.Variable(1.))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "kTzsqN4lmJ0d"
      },
      "source": [
        "#### OneDeviceStrategy\n",
        "\n",
        "`tf.distribute.OneDeviceStrategy` 是一种会将所有变量和计算放在单个指定设备上的策略。\n",
        "\n",
        "```\n",
        "strategy = tf.distribute.OneDeviceStrategy(device=\"/gpu:0\")\n",
        "```\n",
        "\n",
        "此策略与默认策略在诸多方面存在差异。在默认策略中，与没有任何分布策略的 TensorFlow 运行相比，变量放置逻辑保持不变。但是当使用 `OneDeviceStrategy` 时，在其作用域内创建的所有变量都会被显式地放在指定设备上。此外，通过 `OneDeviceStrategy.run` 调用的任何函数也会被放在指定设备上。\n",
        "\n",
        "通过该策略分布的输入将被预提取到指定设备。而在默认策略中，则没有输入分布。\n",
        "\n",
        "与默认策略类似，在切换到实际分布到多个设备/机器的其他策略之前，也可以使用此策略来测试代码。这将比默认策略更多地使用分布策略机制，但不能像使用 `MirroredStrategy` 或 `TPUStrategy` 等策略那样充分发挥其作用。如果您想让代码表现地像没有策略，请使用默认策略。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "hQv1lm9UPDFy"
      },
      "source": [
        "目前为止，我们已经讨论了可用的不同策略以及如何将其实例化。在接下来的几个部分中，我们将讨论使用它们分布训练的不同方法。我们将在本指南中展示简短的代码段，并附上可以从头到尾运行的完整教程的链接。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_mcuy3UhPcen"
      },
      "source": [
        "## 在 `tf.keras.Model.fit` 中使用 `tf.distribute.Strategy`\n",
        "\n",
        "我们已将 `tf.distribute.Strategy` 集成到 `tf.keras`（TensorFlow 对 [Keras API 规范](https://keras.io)的实现）。`tf.keras` 是用于构建和训练模型的高级 API。将该策略集成到 `tf.keras` 后端以后，您可以使用 `model.fit` 在 Keras 训练框架中无缝进行分布式训练。\n",
        "\n",
        "您需要对代码进行以下更改：\n",
        "\n",
        "1. 创建一个合适的 `tf.distribute.Strategy` 实例。\n",
        "2. 将 Keras 模型、优化器和指标的创建转移到 `strategy.scope` 中。\n",
        "\n",
        "我们支持所有类型的 Keras 模型：序贯模型、函数式模型和子类化模型。\n",
        "\n",
        "下面是一段代码，执行该代码会创建一个非常简单的带有一个密集层的 Keras 模型："
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "gbbcpzRnPZ6V"
      },
      "outputs": [],
      "source": [
        "mirrored_strategy = tf.distribute.MirroredStrategy()\n",
        "\n",
        "with mirrored_strategy.scope():\n",
        "  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])\n",
        "\n",
        "model.compile(loss='mse', optimizer='sgd')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "773EOxCRVlTg"
      },
      "source": [
        "在此示例中我们使用了 `MirroredStrategy`，因此我们可以在有多个 GPU 的机器上运行。`strategy.scope()` 会指示 Keras 使用哪个策略来进行分布式训练。我们可以通过在此作用域内创建模型/优化器/指标来创建分布式变量而非常规变量。设置完成后，您就可以像平常一样拟合模型。`MirroredStrategy` 负责将模型的训练复制到可用的 GPU 上，以及聚合梯度等。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ZMmxEFRTEjH5"
      },
      "outputs": [],
      "source": [
        "dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)\n",
        "model.fit(dataset, epochs=2)\n",
        "model.evaluate(dataset)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "nofTLwyXWHK8"
      },
      "source": [
        "我们在这里使用了 `tf.data.Dataset` 来提供训练和评估输入。您还可以使用 Numpy 数组："
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Lqgd9SdxW5OW"
      },
      "outputs": [],
      "source": [
        "import numpy as np\n",
        "inputs, targets = np.ones((100, 1)), np.ones((100, 1))\n",
        "model.fit(inputs, targets, epochs=2, batch_size=10)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IKqaj7QwX0Zb"
      },
      "source": [
        "在上述两种情况（数据集或 Numpy）中，给定输入的每个批次都被平均分到了多个副本中。例如，如果对 2 个 GPU 使用 `MirroredStrategy`，大小为 10 的每个批次将被均分到 2 个 GPU 中，每个 GPU 每步会接收 5 个输入样本。如果添加更多 GPU，每个周期的训练速度就会更快。在添加更多加速器时通常需要增加批次大小，以便有效利用额外的计算能力。您还需要根据模型重新调整学习率。您可以使用 `strategy.num_replicas_in_sync` 获得副本数量。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "quNNTytWdGBf"
      },
      "outputs": [],
      "source": [
        "# Compute global batch size using number of replicas.\n",
        "BATCH_SIZE_PER_REPLICA = 5\n",
        "global_batch_size = (BATCH_SIZE_PER_REPLICA *\n",
        "                     mirrored_strategy.num_replicas_in_sync)\n",
        "dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)\n",
        "dataset = dataset.batch(global_batch_size)\n",
        "\n",
        "LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}\n",
        "learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "z1Muy0gDZwO5"
      },
      "source": [
        "### 目前支持的策略\n",
        "\n",
        "训练 API | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | CentralStorageStrategy | ParameterServerStrategy\n",
        "--- | --- | --- | --- | --- | ---\n",
        "Keras API | 支持 | 支持 | 实验性支持 | 实验性支持 | 计划于 2.3 后支持\n",
        "\n",
        "### 示例和教程\n",
        "\n",
        "下列教程和示例完整演示了上述集成到 Keras 的过程：\n",
        "\n",
        "1. 使用 `MirroredStrategy` 训练 MNIST 的[教程](https://tensorflow.google.cn/tutorials/distribute/keras)。\n",
        "2. 使用 `MultiWorkerMirroredStrategy` 训练 MNIST 的[教程](https://tensorflow.google.cn/tutorials/distribute/multi_worker_with_keras)。\n",
        "3. 使用 `TPUStrategy` 训练 MNIST 的[指南](https://tensorflow.google.cn/guide/tpu#train_a_model_using_keras_high_level_apis)。\n",
        "4. 包含使用各种策略实现的最先进模型集合的 TensorFlow Model Garden [仓库](https://github.com/tensorflow/models/tree/master/official)。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IlYVC0goepdk"
      },
      "source": [
        "## 在自定义训练循环中使用 `tf.distribute.Strategy`\n",
        "\n",
        "如您所见，在 Keras `model.fit` 中使用 `tf.distribute.Strategy` 只需改动几行代码。再多花点功夫，您还可以在自定义训练循环中使用 `tf.distribute.Strategy`。\n",
        "\n",
        "如果您需要更多相对于使用 Estimator 或 Keras 时的灵活性和对训练循环的控制权，您可以编写自定义训练循环。例如，在使用 GAN 时，您可能会希望每轮使用不同数量的生成器或判别器步骤。同样，高级框架也不太适合强化学习训练。\n",
        "\n",
        "为了支持自定义训练循环，我们通过 `tf.distribute.Strategy` 类提供了一组核心方法。使用这些方法可能需要在开始时对代码进行轻微重构，但完成重构后，您只需更改策略实例就能够在 GPU、TPU 和多台机器之间进行切换。\n",
        "\n",
        "下面我们将用一个简短的代码段说明此用例，其中的简单训练样本使用与之前相同的 Keras 模型。\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "XNHvSY32nVBi"
      },
      "source": [
        "首先，在该策略的作用域内创建模型和优化器。这样可以确保使用此模型和优化器创建的任何变量都是镜像变量。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "W-3Bn-CaiPKD"
      },
      "outputs": [],
      "source": [
        "with mirrored_strategy.scope():\n",
        "  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])\n",
        "  optimizer = tf.keras.optimizers.SGD()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "mYkAyPeYnlXk"
      },
      "source": [
        "接下来，我们创建输入数据集并调用 `tf.distribute.Strategy.experimental_distribute_dataset`，以根据策略分布数据集。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "94BkvkLInkKd"
      },
      "outputs": [],
      "source": [
        "dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(\n",
        "    global_batch_size)\n",
        "dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "grzmTlSvn2j8"
      },
      "source": [
        "然后，我们定义一个训练步骤。我们将使用 `tf.GradientTape` 来计算梯度，并使用优化器来应用这些梯度以更新模型变量。要分布此训练步骤，我们加入一个 `train_step` 函数，并将此函数和从之前创建的 `dist_dataset` 获得的数据集输入一起传递给 `tf.distrbute.Strategy.run`："
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "NJxL5YrVniDe"
      },
      "outputs": [],
      "source": [
        "loss_object = tf.keras.losses.BinaryCrossentropy(\n",
        "  from_logits=True,\n",
        "  reduction=tf.keras.losses.Reduction.NONE)\n",
        "\n",
        "def compute_loss(labels, predictions):\n",
        "  per_example_loss = loss_object(labels, predictions)\n",
        "  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)\n",
        "\n",
        "def train_step(inputs):\n",
        "  features, labels = inputs\n",
        "\n",
        "  with tf.GradientTape() as tape:\n",
        "    predictions = model(features, training=True)\n",
        "    loss = compute_loss(labels, predictions)\n",
        "\n",
        "  gradients = tape.gradient(loss, model.trainable_variables)\n",
        "  optimizer.apply_gradients(zip(gradients, model.trainable_variables))\n",
        "  return loss\n",
        "\n",
        "@tf.function\n",
        "def distributed_train_step(dist_inputs):\n",
        "  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))\n",
        "  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,\n",
        "                         axis=None)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "yRL5u_NLoTvq"
      },
      "source": [
        "以上代码还需注意以下几点：\n",
        "\n",
        "1. 我们使用了 `tf.nn.compute_average_loss` 来计算损失。`tf.nn.compute_average_loss` 将每个样本的损失相加，然后将总和除以 global_batch_size。这很重要，因为稍后在每个副本上计算出梯度后，会通过对它们**求和**使其在副本中聚合。\n",
        "2. 我们使用了 `tf.distribute.Strategy.reduce` API 来聚合 `tf.distribute.Strategy.run` 返回的结果。`tf.distribute.Strategy.run` 会从策略中的每个本地副本返回结果，且有多种方法使用该结果。可以 `reduce` 它们以获得聚合值。还可以通过执行 `tf.distribute.Strategy.experimental_local_results` 获得包含在结果中的值的列表，每个本地副本一个列表。\n",
        "3. 当在一个分布策略作用域内调用 `apply_gradients` 时，它的行为会被修改。具体来说，在同步训练期间，在将梯度应用于每个并行实例之前，它会对梯度的所有副本求和。\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "o9k_6-6vpQ-P"
      },
      "source": [
        "最后，当我们定义完训练步骤后，就可以迭代 `dist_dataset`，并在循环中运行训练："
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Egq9eufToRf6"
      },
      "outputs": [],
      "source": [
        "for dist_inputs in dist_dataset:\n",
        "  print(distributed_train_step(dist_inputs))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "jK8eQXF_q1Zs"
      },
      "source": [
        "在上面的示例中，我们通过迭代 `dist_dataset` 为训练提供输入。我们还提供 `tf.distribute.Strategy.make_experimental_numpy_dataset` 以支持 Numpy 输入。您可以在调用 `tf.distribute.Strategy.experimental_distribute_dataset` 之前使用此 API 来创建数据集。\n",
        "\n",
        "迭代数据的另一种方法是显式地使用迭代器。当您希望运行给定数量的步骤而非迭代整个数据集时，可能会用到此方法。现在可以将上面的迭代修改为：先创建迭代器，然后在迭代器上显式地调用 `next` 以获得输入数据。\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "e5BEvR0-LJAc"
      },
      "outputs": [],
      "source": [
        "iterator = iter(dist_dataset)\n",
        "for _ in range(10):\n",
        "  print(distributed_train_step(next(iterator)))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "vDJO8mnypqBA"
      },
      "source": [
        "上面是使用 `tf.distribute.Strategy` API 来分布自定义训练循环最简单的情况。我们正在改进这些 API。由于此用例还需进一步调整才能适应您的代码，我们未来会发布单独的详细指南。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "BZjNwCt1qBdw"
      },
      "source": [
        "### 目前支持的策略\n",
        "\n",
        "训练 API | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | CentralStorageStrategy | ParameterServerStrategy\n",
        ":-- | :-- | :-- | :-- | :-- | :--\n",
        "自定义训练循环 | 支持 | 支持 | 实验性支持 | 实验性支持 | 计划于 2.3 后支持\n",
        "\n",
        "### 示例和教程\n",
        "\n",
        "下面是在自定义训练循环中使用分布策略的一些示例：\n",
        "\n",
        "1. 使用 `MirroredStrategy` 训练 MNIST 的[教程](https://tensorflow.google.cn/tutorials/distribute/custom_training)。\n",
        "2. 使用 `TPUStrategy` 训练 MNIST 的[指南](https://tensorflow.google.cn/guide/tpu#train_a_model_using_custom_training_loop)。\n",
        "3. 包含使用各种策略实现的最先进模型集合的 TensorFlow Model Garden [仓库](https://github.com/tensorflow/models/tree/master/official)。\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "nO0hmFCRoIll"
      },
      "source": [
        "## 在 Estimator 中使用 `tf.distribute.Strategy`（有限支持）\n",
        "\n",
        "`tf.estimator` 是分布式训练 TensorFlow API，最初支持异步参数服务器方法。与 Keras 类似，我们已将 `tf.distribute.Strategy` 集成到 `tf.Estimator`。如果您正在使用 Estimator 进行训练，那么您只需改动少量代码即可轻松转换为分布式训练。借助此功能，Estimator 用户现在可以在多个 GPU 和多个工作进程以及 TPU 上进行同步分布式训练。但是，Estimator 的这种支持是有限的。有关详细信息，请参阅下文[目前支持的策略](#estimator_support)部分。\n",
        "\n",
        "在 Estimator 中使用 `tf.distribute.Strategy` 的方法与 Keras 略有不同。现在我们不使用 `strategy.scope`，而是将策略对象传递到 Estimator 的 [RunConfig](https://tensorflow.google.cn/api_docs/python/tf/estimator/RunConfig) 中。\n",
        "\n",
        "以下代码段使用预制 Estimator `LinearRegressor` 和 `MirroredStrategy` 展示了这种情况：\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "oGFY5nW_B3YU"
      },
      "outputs": [],
      "source": [
        "mirrored_strategy = tf.distribute.MirroredStrategy()\n",
        "config = tf.estimator.RunConfig(\n",
        "    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)\n",
        "regressor = tf.estimator.LinearRegressor(\n",
        "    feature_columns=[tf.feature_column.numeric_column('feats')],\n",
        "    optimizer='SGD',\n",
        "    config=config)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "n6eSfLN5RGY8"
      },
      "source": [
        "我们在这里使用了预制 Estimator，但同样的代码也适用于自定义 Estimator。`train_distribute` 决定训练如何分布，`eval_distribute` 决定评估如何分布。这是与 Keras 的另一个区别，在 Keras 中，我们会对训练和评估使用相同的策略。\n",
        "\n",
        "现在，我们可以使用输入函数来训练和评估这个 Estimator：\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "2ky2ve2PB3YP"
      },
      "outputs": [],
      "source": [
        "def input_fn():\n",
        "  dataset = tf.data.Dataset.from_tensors(({\"feats\":[1.]}, [1.]))\n",
        "  return dataset.repeat(1000).batch(10)\n",
        "regressor.train(input_fn=input_fn, steps=10)\n",
        "regressor.evaluate(input_fn=input_fn, steps=10)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "hgaU9xQSSk2x"
      },
      "source": [
        "需要在这里强调的 Estimator 和 Keras 的另一个区别是输入处理。在 Keras 中，我们提到过数据集的每个批次都会在多个副本之间自动拆分。但在 Estimator 中，批次不会自动拆分，也不会在不同的工作进程之间对数据进行分片处理。您可以完全控制数据在工作进程和设备之间的分布方式，而且您必须提供 `input_fn` 来指定数据的分布方式。\n",
        "\n",
        "每个工作进程都会调用一次 `input_fn`，从而为每个工作进程提供一个数据集。然后数据集中的一个批次会被馈送到此工作进程上的一个副本，因此，1 个工作进程上的 N 个副本要使用 N 个批次。换句话说，`input_fn` 返回的数据集应提供大小为 `PER_REPLICA_BATCH_SIZE` 的批次。步骤的全局批次大小可通过 `PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync` 获得。\n",
        "\n",
        "在进行多工作进程训练时，您应该将数据拆分至各个工作进程，或者在每个工作进程上打乱随机种子的顺序。您可以在[使用 Estimator 进行多工作进程训练](https://render.githubusercontent.com/tutorials/distribute/multi_worker_with_estimator.ipynb)中参阅有关此操作的示例。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "G3ieQKfWZhhL"
      },
      "source": [
        "同样，您也可以使用多工作进程和参数服务器策略。代码保持不变，但需要使用 `tf.estimator.train_and_evaluate`，并为集群中运行的每个二进制文件设置 `TF_CONFIG` 环境变量。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "A_lvUsSLZzVg"
      },
      "source": [
        "<a name=\"estimator_support\"></a>\n",
        "\n",
        "### 目前支持的策略\n",
        "\n",
        "除 `TPUStrategy` 外，所有策略都对使用 Estimator 的训练提供有限支持。基本训练和评估应该可以正常运行，但如基架之类的许多高级功能尚不可用。此集成中可能还存在许多错误。目前，我们不打算主动改进此支持，而是专注于对 Keras 和自定义训练循环的支持。如果可能，您应该会更喜欢在这些 API 中使用 `tf.distribute`。\n",
        "\n",
        "训练 API | MirroredStrategy | TPUStrategy | MultiWorkerMirroredStrategy | CentralStorageStrategy | ParameterServerStrategy\n",
        ":-- | :-- | :-- | :-- | :-- | :--\n",
        "Estimator API | 有限支持 | 不支持 | 有限支持 | 有限支持 | 有限支持\n",
        "\n",
        "### 示例和教程\n",
        "\n",
        "下列示例展示了 Estimator 中各种策略的完整用法：\n",
        "\n",
        "1. 使用 `MultiWorkerMirroredStrategy` 通过多个工作进程训练 MNIST 的[使用 Estimator 进行多工作进程训练](https://render.githubusercontent.com/tutorials/distribute/multi_worker_with_estimator.ipynb)。\n",
        "2. 使用 Kubernetes 模板在 tensorflow/ecosystem 中进行多工作进程训练的[完整示例](https://github.com/tensorflow/ecosystem/tree/master/distribution_strategy)。本示例从 Keras 模型开始，然后使用 `tf.keras.estimator.model_to_estimator` API 将其转换为 Estimator。\n",
        "3. 可以使用 `MirroredStrategy` 或 `MultiWorkerMirroredStrategy` 进行训练的官方 [ResNet50](https://github.com/tensorflow/models/blob/master/official/vision/image_classification/resnet_imagenet_main.py) 模型。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Xk0JdsTHyUnE"
      },
      "source": [
        "## 其他主题\n",
        "\n",
        "在此部分，我们将介绍与多个用例相关的主题。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "cP6BUIBtudRk"
      },
      "source": [
        "<a name=\"TF_CONFIG\"></a>\n",
        "\n",
        "### 设置 TF_CONFIG 环境变量\n",
        "\n",
        "对于多工作进程训练来说，如前所述，您需要为每个在集群中运行的二进制文件设置 `TF_CONFIG` 环境变量。`TF_CONFIG` 环境变量是一个 JSON 字符串，它指定了构成集群的任务、它们的地址，以及每个任务在集群中的角色。我们在 [tensorflow/ecosystem](https://github.com/tensorflow/ecosystem) 仓库中提供了一个 Kubernetes 模板，可为您的训练任务设置 `TF_CONFIG`。\n",
        "\n",
        "TF_CONFIG 有两个组件：cluster 和 task。cluster 会提供有关训练集群的信息，这是一个由不同类型的作业（如工作进程）组成的字典。在多工作进程训练中，通常会有一个工作进程除了要完成常规工作进程的工作之外，还要承担更多责任，如保存检查点和为 TensorBoard 编写摘要文件。此类工作进程称为 \"chief\" 工作进程，习惯上会将索引为 0 的工作进程指定为 chief 工作进程（实际上这是 tf.distribute.Strategy 的实现方式）。另一方面，task 会提供有关当前任务的信息。第一个组件 cluster 对于所有工作进程都相同，而第二个组件 task 在每个工作进程上均不相同，并指定了该工作进程的类型和索引。\n",
        "\n",
        "`TF_CONFIG` 的示例如下：\n",
        "\n",
        "```\n",
        "os.environ[\"TF_CONFIG\"] = json.dumps({     \"cluster\": {         \"worker\": [\"host1:port\", \"host2:port\", \"host3:port\"],         \"ps\": [\"host4:port\", \"host5:port\"]     },    \"task\": {\"type\": \"worker\", \"index\": 1} })\n",
        "```\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "fezd3aF8wj9r"
      },
      "source": [
        "此 `TF_CONFIG` 指定了集群中包含三个工作进程和两个 ps 任务，以及它们的主机和端口。\"task\" 部分指定当前任务在集群中的角色，即 worker 1（第二个工作进程）。集群中的有效角色是 \"chief\"、\"worker\"、\"ps\" 和 \"evaluator\"。除使用 `tf.distribute.experimental.ParameterServerStrategy` 时外，不应有 \"ps\" 作业。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "GXIbqSW-sFVg"
      },
      "source": [
        "## 后续计划\n",
        "\n",
        "我们正在积极开发 `tf.distribute.Strategy`。欢迎您试用，并通过 [GitHub 问题](https://github.com/tensorflow/tensorflow/issues/new)提供反馈。"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [
        "Tce3stUlHN0L"
      ],
      "name": "distributed_training.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
